/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.nexmark.queries.sql;



import com.bff.gaia.unified.sdk.extensions.sql.SqlTransform;

import com.bff.gaia.unified.sdk.nexmark.NexmarkConfiguration;

import com.bff.gaia.unified.sdk.nexmark.model.Bid;

import com.bff.gaia.unified.sdk.nexmark.model.Event;

import com.bff.gaia.unified.sdk.nexmark.model.sql.SelectEvent;

import com.bff.gaia.unified.sdk.nexmark.queries.NexmarkQueryTransform;

import com.bff.gaia.unified.sdk.nexmark.queries.NexmarkQueryUtil;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.schemas.transforms.Convert;

import com.bff.gaia.unified.sdk.transforms.Filter;

import com.bff.gaia.unified.sdk.values.*;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionTuple;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TupleTag;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



/** Basic stream enrichment: join a stream to a bounded side input. */

public class SqlBoundedSideInputJoin extends NexmarkQueryTransform<Bid> {

  private final String query;



  public SqlBoundedSideInputJoin(NexmarkConfiguration configuration) {

    super("SqlBoundedSideInputJoin");



    // Notes on the sensitivities of our parsing and planning:

    //  - cannot directly join MOD(bidder, x) = side.id because only equijoins on col refs allowed,

    //    so we need a WITH clause or subquery

    //  - must have the CAST inside the WITH clause for the same reason, otherwise the cast

    //    occurs in the join condition CAST(side_id AS BIGINT) = side.id

    query =

        String.format(

            "WITH bid_with_side (auction, bidder, price, dateTime, extra, side_id) AS (%n"

                + "  SELECT *, CAST(MOD(bidder, %d) AS BIGINT) side_id FROM bid%n"

                + ")%n"

                + " SELECT bid_with_side.auction%n"

                + ", bid_with_side.bidder%n"

                + ", bid_with_side.price%n"

                + ", bid_with_side.dateTime%n"

                + ", side.extra%n"

                + " FROM bid_with_side, side%n"

                + " WHERE bid_with_side.side_id = side.id",

            configuration.sideInputRowCount);

  }



  @Override

  public boolean needsSideInput() {

    return true;

  }



  @Override

  public PCollection<Bid> expand(PCollection<Event> events) {

    PCollection<Row> bids =

        events

            .apply(Filter.by(NexmarkQueryUtil.IS_BID))

            .apply(getName() + ".SelectEvent", new SelectEvent(Event.Type.BID));



    checkState(getSideInput() != null, "Configuration error: side input is null");



    TupleTag<Row> sideTag = new TupleTag<Row>("side") {};

    TupleTag<Row> bidTag = new TupleTag<Row>("bid") {};



    Schema schema =

        Schema.of(

            Schema.Field.of("id", Schema.FieldType.INT64),

            Schema.Field.of("extra", Schema.FieldType.STRING));



    PCollection<Row> sideRows =

        getSideInput()

            .setSchema(

                schema,

                kv -> Row.withSchema(schema).addValues(kv.getKey(), kv.getValue()).build(),

                row -> KV.of(row.getInt64("id"), row.getString("extra")))

            .apply("SideToRows", Convert.toRows());



    return PCollectionTuple.of(bidTag, bids)

        .and(sideTag, sideRows)

        .apply(SqlTransform.query(query))

        .apply("ResultToBid", Convert.fromRows(Bid.class));

  }

}